package com.chukun.flink.table.helloworld;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.util.Arrays;

/**
 * @author chukun
 * @version 1.0.0
 * @description table helloworld操作
 * @createTime 2022年05月29日 15:31:00
 */
public class StreamTableHelloWorld {

    public static void main(String[] args) throws Exception {
        // 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 获取表环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 加载数据源
        DataStreamSource<Long> dataStream = env.fromCollection(Arrays.asList(1L, 2L, 3L, 4L));

        // dataStream转换为table
        Table inputTable = tableEnv.fromDataStream(dataStream);

        // 将 dataStream注册为临时表，表名为 helloworld
        tableEnv.createTemporaryView("helloworld", inputTable);

        // 使用sql查询helloworld的数据
        Table result = tableEnv.sqlQuery("select * from helloworld");

        DataStream<Row> rowDataStream = tableEnv.toDataStream(result);

        // 将查询的数据转换为数据流
        rowDataStream.print("StreamTableEnvironment");

        // 触发flink作业
        env.execute("StreamTableHelloWorld");

    }
}
